Kinesis Streamsと別リージョンのFirehoseをLambdaで連携させてみた
はじめに
AWSチームのすずきです。
AWSがストリーミングデータ向けプラットフォームとして提供するAmazon Kinesis。
2014年に現在のKinesis Streamsがリリースされた後、2015年10月にS3、Redshift、ElastiserchServiceへの自動エクスポートを備えたKinesis Firehose、 2016年8月にはCEP(Complex Event Processing:複合イベント処理)エンジンとしてKinesis Analyticsがリリースされました。
ただ2016年9月現在、東京リージョンで利用する事のできるKinesisはStreamsのみ。 Firehoseと、AnalyticsはUS(オレゴン、バージニア)、EU(アイルランド)リージョンを利用する必要があります。
今回、東京リージョンのKinesis Streamsに届くデータを、オレゴンリージョンのFirehoseとAnalyticsでの処理すべく、Lambdaを利用した連携を行う機会がありましたので、 その内容について紹介させて頂きます。
概要図
- 東京リージョンにLambdaを設置し、Kinesis Streamsのイベントトリガとして、Kinesisに登録されたデータをオレゴンリージョンのFirehoseに転送を実現しました。
設定手順
IAM設定
- Lambda関数にAWSのIAM権限を付与するため、「AWS Lambda」用のAWSサービスロールを作成します。
- LambdaのBluePrint、「kinesis-process-record-python」用のロールを元にfirehoseのIAM権限の付与を行いました。
- 今回、検証用の環境であったため対象リソースはワイルドカードを利用しましたが、他システムと共存する環境では、適切なARN指定を行う事をおすすめします。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "*" }, { "Effect": "Allow", "Action": "kinesis:ListStreams", "Resource": "*" }, { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator" ], "Resource": "*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "firehose:DescribeDeliveryStream", "firehose:ListDeliveryStreams", "firehose:PutRecord", "firehose:PutRecordBatch" ], "Resource": [ "*" ] } ] }
Firehose
- 今回、Kinesis Firehose はオレゴン(us-west-2)を利用しました。
- Firehoseの設定は、希望するデータ出力先に応じて実施してください。
S3
Redshift
Elasticsearch
Lambda
ファンクション作成
- 「Lambda」のAWSコンソールより、「Create a Lambda function」を実施します。
- BluePrintの指定はSkipします。
トリガ設定
- 連携元となる「Kinesis Streams」を指定します。
- Batch Sizeは500、Put-Records APIの上限に合わせます
- 1件のレコードサイズが平均2KBを超える場合、1回のバッチPut容量が1MB以下になる様にBatch Sizeを減らす必要があります。
- Stating Posionは「Trim Horizon」、過去に登録されたレコード(標準では24時間)もFirehoseの転送対象とします。
- Enable trigger は後ほど有効とします。
ファンクション設定
- 任意のファンクション名、説明コメントを記載します
- 今回、「Python」を利用したため、Runtimeは「Python2.7」を指定します。
- 「Code entry type」は、「Edit code inline」下記のコードを利用しました。
- 出力先のFirehose情報(region、DeliveryStreamname)は適宜修正します。
lambda-kinesis-event-stream-to-firehose.py
- Handlerは「Python2.7」デフォルトをそのまま利用します。
- Roleは、先に作成したロールを指定します
- 割当メモリは、デフォルト(128MB)とし、稼働状況をみて増強するものとします。
- 通常1回の処理、500ms程度ですが、タイムアウトは1分まで延長しました。
- VPCは利用しません。
トリガー有効化
- 「Enable」操作を実施します。
動作確認
イベントトリガのステータス
- 「Last result: OK」となる事を確認します
CloudwatchLogs
- CloudwatchLogs で、異常出力のない事を確認します
Monitoring
- Errors、Throttlesが発生していない事を確認します
S3 (Firehose)
- Firehoseの出力先として設定したS3のファイル存在と内容確認を行います。
AWS費用試算
下記のほぼ毎秒、小容量のデータが絶え間なく到着する環境で、LambdaによるFirehose転送を実施した場合の費用を試算した所、日額$0.036、月額1.1$程度の計算になりました。
Lambdaには無料枠(毎月最初の 1,000,000 件、400,000 GB-秒のコンピューティング時間)も存在するので、低コストでの維持が出来ると予想できます。
1日コスト
- Lambda割当メモリ : 128MB
- 平均実行時間 : 300ms
- 1日実行回数 : 10万回
- 1日転送バイト : 20MB
Lambda
- ($0.000000208(100 ミリ秒単位の価格) × 3(300ミリ秒分) + ($0.0000002(1リクエスト価格) × 100,000(1日リクエスト数)) = $0.020000624
Kinesis Firehose
- ($0.035 (1GB単価) × 0.02(GB) = $0.0007
- US価格
ネットワーク
- $0.02 /GB × 0.04(GB) = $0.0008
- 別AWSリージョンのOutbound通信費
CloudwatchLogs
- ($0.76 (1GB単価)) × 0.002(GB) = $0.0152
まとめ
Lambdaを利用する事で、東京リージョンのKinesis Streamsと海外のオレゴンリージョンのKinesis Firehoseを簡単に連携させる事ができました。
ログファイルの圧縮やRedshift展開まで、フルマネージドで実施してくれるKinesis Firehoseや、強力なCEPエンジンとして利用できるKinesis Analyticsの評価に お試しください。